# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys, subprocess, math, copy
import itertools as it
import numpy as np
from xml.etree import cElementTree as ElementTree
from abc import abstractmethod, ABCMeta
from hysop import vprint
from hysop.backend import __HAS_OPENCL_BACKEND__
from hysop.tools.henum import EnumFactory
from hysop.tools.decorators import requires_cmd
from hysop.tools.contexts import printoptions
from hysop.tools.string_utils import prepend, camel2snake
from hysop.tools.units import bytes2str
from hysop.tools.io_utils import IO
from hysop.tools.cache import load_data_from_cache, update_cache, machine_id
from hysop.backend.hardware.pci_ids import PCIIds
from hysop.core.mpi import is_multihost, interhost_comm, host_rank
[docs]
class TopologyObject(metaclass=ABCMeta):
"""
XML parser base to parse lstopo (hardware info) xml output.
See hwloc(7) and lstopo(1) man.
"""
_print_indent = " " * 2
def __init__(self, parent, element, pciids=None):
self.parent = parent
self.pciids = pciids or parent.pciids
if element.tag == "topology":
self._attributes = None
elif element.tag == "object":
self._attributes = self._parse_attributes(element.attrib)
else:
raise ValueError(f"Unknown initialization tag {tag}.")
for child in element:
self._handle_child(child)
self._post_init()
[docs]
def indent(self, string, extra_spaces=0):
return prepend(string, self._print_indent + " " * extra_spaces)
[docs]
def pop_attr(self, name):
return self._attributes.pop(name)
[docs]
def attributes(self):
return self._attributes
[docs]
def attribute(self, name, default=None, cast=lambda x: x):
if name in self._attributes:
return cast(self._attributes[name])
else:
return default
[docs]
def update_attributes(self, attr):
self._attributes.update(attr)
[docs]
def cpu_set(self):
return self.attribute("cpuset")
[docs]
def full_cpu_set(self):
if self._parsed_type() in ["Machine"]:
return self.cpu_set()
else:
return self.parent.full_cpu_set()
def _physical_cores_count(self):
if self._parsed_type() in ["Machine"]:
return self.physical_cores_count()
else:
return self.parent._physical_cores_count()
def _processing_units_count(self):
if self._parsed_type() in ["Machine"]:
return self.processing_units_count()
else:
return self.parent._processing_units_count()
[docs]
def cpu_mask(self):
cpuset = self.cpu_set()
mask_length = self._processing_units_count()
_cpuset = "|{0:0{length}b}|".format(cpuset, length=mask_length)
_cpuset = _cpuset.replace("0", ".").replace("1", "x")
_cpuset += " 0x{0:0{length}x}".format(cpuset, length=mask_length // 4)
return _cpuset
[docs]
def all_cpu_set(self):
return "0x{:x} (complete=0x{:x}, online=0x{:x}, allowed=0x{:x})".format(
self.attribute("cpuset"),
self.attribute("complete_cpuset"),
self.attribute("online_cpuset"),
self.attribute("allowed_cpuset"),
)
[docs]
def os_index(self):
return self.attribute("os_index")
[docs]
def print_attributes(self):
print(f"{self.__class__.__name__} attributes:")
for k, v in self.attributes().items():
print(f" {k} -> {v}")
print()
def _post_init(self):
pass
def _handle_child(self, child):
tag = child.tag
attr = child.attrib
if tag == "info":
self._parse_info(attr)
elif tag == "object":
self._parse_object(child)
elif tag == "distances":
self._parse_distances(child)
elif tag == "page_type":
self._parse_page_type(attr)
else:
raise ValueError(f"Unknown tag {tag}.")
@abstractmethod
def _parsed_type(self):
pass
@abstractmethod
def _parse_object(self, it):
pass
def _parse_attributes(self, attributes):
info = {}
_type = attributes.pop("type")
if not _type == self._parsed_type():
msg = "Type '{}' does not match parsed type '{}'."
msg = msg.format(_type, self._parsed_type())
raise ValueError(msg)
for k, v in attributes.items():
if (k.find("cpuset") >= 0) or (k.find("nodeset") >= 0):
vv = tuple(int(x, 16) if x != "" else 0 for x in v.split(","))
v = 0
for i, vi in enumerate(vv):
v |= vi << (32 * (len(vv) - 1 - i))
info[k] = v
elif k in ["local_memory", "os_index", "osdev_type"]:
info[k] = int(v)
elif k in ["pci_link_speed"]:
info[k] = float(v)
elif k in [
"bridge_pci",
"bridge_type",
"depth",
"pci_type",
"pci_busid",
"name",
]:
info[k] = v.strip()
else:
raise ValueError(f"Unknown key {k} with value {v}.")
return info
def _parse_info(self, info):
name = info["name"]
value = info["value"]
name = camel2snake(name.replace("DMI", ""))
self._attributes[name] = value
def _parse_page_type(self, page_type):
pass
def _parse_distances(self, child):
distances = child.attrib
nbobjs = int(distances["nbobjs"])
assert nbobjs > 1
values = []
for v in child:
assert v.tag == "latency"
val = v.attrib["value"]
values.append(val)
assert len(values) == nbobjs * nbobjs
self._attributes["distances"] = np.reshape(
np.asarray(values, dtype=np.float32),
(
nbobjs,
nbobjs,
),
)
[docs]
class HardwareStatistics(metaclass=ABCMeta):
def _minmax(self, values, op=lambda x: x, dtype=np.int32):
return "mean={}, min={}, max={}".format(
op(np.mean(values).astype(dtype)),
op(np.min(values).astype(dtype)),
op(np.max(values).astype(dtype)),
)
def _total_minmax(self, values, op=lambda x: x, dtype=np.int32):
return "{:<10} ({})".format(
op(np.sum(values).astype(dtype)),
self._minmax(values=values, op=op, dtype=dtype),
)
def _pct(self, values):
values = np.asarray(values)
count = values.size
total = values.sum()
return f"{int(total)}/{int(count)} ({total*100.0/count}%)"
def _mean(self, values, op=lambda x: x, dtype=np.float32):
if any((v is not None) for v in values):
values = np.asarray([x for x in values if (x is not None)], dtype=dtype)
return op(np.mean(values))
else:
return op(np.nan)
def __str__(self):
return self.to_string(0, 2)
[docs]
@abstractmethod
def to_string(self, indent=0, increment=2):
pass
@abstractmethod
def __iadd__(self, other):
pass
[docs]
class TopologyStatistics(HardwareStatistics):
def __init__(self, topo=None):
self._count = 0
self._numa_nodes = []
self._memory_per_node = []
self._packages = []
self._physical_cores = []
self._processing_units = []
self._has_opencl = []
self._has_cuda = []
self._backend_statistics = {}
if topo is not None:
self._count += 1
machine = topo.machine()
self._numa_nodes.append(machine.numa_nodes_count())
for node in machine.numa_nodes():
self._memory_per_node.append(node.local_memory())
self._packages.append(node.cpu_packages_count())
self._physical_cores.append(node.physical_cores_count())
self._processing_units.append(node.processing_units_count())
self._has_opencl.append(topo.has_opencl())
self._has_cuda.append(topo.has_cuda())
if any(self._has_opencl):
self._backend_statistics["opencl"] = topo._opencl_backend.stats()
if any(self._has_cuda):
self._backend_statistics["cuda"] = topo._cuda_backend.stats()
def __iadd__(self, other):
if other is None:
return self
if isinstance(other, Topology):
other = TopologyStatistics(topo=other)
if not isinstance(other, TopologyStatistics):
msg = "Unknown type {}, expected Topology or TopologyStatistics."
msg = msg.format(type(other))
raise TypeError(msg)
self._count += other._count
self._numa_nodes += other._numa_nodes
self._memory_per_node += other._memory_per_node
self._packages += other._packages
self._physical_cores += other._physical_cores
self._processing_units += other._processing_units
self._has_opencl += other._has_opencl
self._has_cuda += other._has_cuda
for k, v in other._backend_statistics.items():
if k in self._backend_statistics:
self._backend_statistics[k] += v
else:
self._backend_statistics[k] = v
return self
[docs]
def to_string(self, indent=0, increment=0):
ind = " " * indent
inc = " " * increment
if "opencl" in self._backend_statistics:
opencl = "\n" + self._backend_statistics["opencl"].to_string(
indent + 2 * increment, increment
)
else:
opencl = ""
if "cuda" in self._backend_statistics:
cuda = "\n" + self._backend_statistics["cuda"].to_string(
indent + 2 * increment, increment
)
else:
cuda = ""
msg = """
{ind}Collected hardware statistics about {} compute node(s):
{ind}{inc}NUMA nodes: {}
{ind}{inc}Total memory: {}
{ind}{inc}CPU packages: {}
{ind}{inc}Physical cores: {}
{ind}{inc}Processing units: {}
{ind}{inc}OpenCL support: {}{}""".format(
# {ind}{inc}Cuda support: {}{}'''.format(
self._count,
self._total_minmax(self._numa_nodes),
self._total_minmax(self._memory_per_node, op=bytes2str, dtype=np.float32),
self._total_minmax(self._packages),
self._total_minmax(self._physical_cores),
self._total_minmax(self._processing_units),
self._pct(self._has_opencl),
opencl,
# self._pct(self._has_cuda), cuda,
ind=ind,
inc=inc,
)
return msg
[docs]
class Topology(TopologyObject):
[docs]
@classmethod
@requires_cmd("lstopo")
def parse(cls, pciids=None, override_cache=False):
if pciids is None:
pciids = PCIIds()
key = machine_id
filepath = IO.cache_path() + "/hardware/topologies.pklz"
topology = load_data_from_cache(filepath=filepath, key=key)
if (topology is None) or (not isinstance(topology, Topology)) or override_cache:
topology = subprocess.check_output(
["lstopo", "-l", "-v", "--no-caches", "--cpuset", "--of", "xml"]
)
topology = ElementTree.fromstring(topology)
topology = Topology(parent=None, topo=topology, pciids=pciids)
update_cache(filepath, key, topology)
return topology
def __init__(self, parent, topo, pciids):
self._machine = None
self._opencl_backend = None
self._cuda_backend = None
self._stats = None
super().__init__(parent, topo, pciids)
def _backend_report(self):
ss = ""
if self.has_opencl:
ss += str(self._opencl_backend)
else:
ss += ":: OpenClBackend::"
ss += "\n > OpenCL backend not found on this system."
# if self.has_cuda:
# ss += str(self._cuda_backend)
# else:
# ss += ':: CudaBackend::'
# ss += '\n Cuda backend support not implemented yet.'
# ss += '\n Cuda backend not found on this system.'
return ss
def _post_init(self):
self._find_logical_devices()
[docs]
def machine(self):
return self._machine
[docs]
def host_stats(self):
return TopologyStatistics(self)
[docs]
def has_opencl(self):
return self._opencl_backend is not None
[docs]
def has_cuda(self):
return self._cuda_backend is not None
def _parsed_type(self):
return "Topology"
def _parse_object(self, it):
from hysop.backend.hardware.machine import Machine
_type = it.attrib["type"]
if _type == "Machine":
assert self._machine is None
self._machine = Machine(self, it)
else:
msg = f"Unknown object type {_type} obtained during Topology parsing."
raise ValueError(msg)
def _find_logical_devices(self):
"""
Look for logical devices exposed by various backends
and bind them to physical devices
"""
if __HAS_OPENCL_BACKEND__:
from hysop.backend.device.opencl.opencl_hardware_backend import (
OpenClBackend,
)
self._opencl_backend = OpenClBackend(hardware_topo=self.machine())
else:
self._opencl_backend = None
def __str__(self):
return f"{self.machine()}\n{self._backend_report()}"
if __name__ == "__main__":
if len(sys.argv) == 1:
pciids = PCIIds()
else:
pciids = PCIIds(path=sys.argv[1])
topo = Topology.parse(pciids)
print(topo)